package com.dahuan.tables;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;

public class Table_FileOutput {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism( 1 );

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create( env );

        String path = "E:\\Project\\FlinkTutorials\\Flink-Scala\\src\\main\\resources\\sensor.txt";
        //读取路径
        tableEnv.connect( new FileSystem().path( path ) )
                //指定定义如何从连接器读取数据的格式。
                .withFormat( new Csv() )
                //指定结果表架构。
                .withSchema( new Schema()
                                .field( "id", DataTypes.STRING() )
                                .field( "timestamp", DataTypes.BIGINT() )
                                .field( "temp", DataTypes.DOUBLE() )
                        //TODO 在给定路径中注册由基础属性描述的表。
                ).createTemporaryTable( "inputTable" );

        //TODO 读取已经注册的表并生成Table对象
        Table inputTable = tableEnv.from( "inputTable" );

        //TODO 查询转换
        //1.TableAPI
        //简单转换
        Table resultTable = inputTable.select( "id,temp" )
                .filter( "id === 'sensor_6'" );

        //聚合统计
        Table aggTable = inputTable.groupBy( "id" )
                .select( "id,id.count as count,temp.avg as avgTemp" );

        //SQL
        String sql1 = "select id,temp from inputTable where id = 'sensor_6'";
        String sql2 = "select id,count(id) as cnt,avg(temp) as avgTemp from inputTable group by id";
        Table resultSQL = tableEnv.sqlQuery( sql1 );
        Table aggSQL = tableEnv.sqlQuery( sql2 );



        //输出表
        String outputpath = "E:\\Project\\FlinkTutorials\\Flink-Scala\\src\\main\\resources\\out.txt";
        //读取路径
        tableEnv.connect( new FileSystem().path( outputpath ) )
                //指定定义如何从连接器读取数据的格式。
                .withFormat( new Csv() )
                //指定结果表架构。
                .withSchema( new Schema()
                                .field( "id", DataTypes.STRING() )
                                .field( "temp", DataTypes.DOUBLE() )
                        //TODO 在给定路径中注册由基础属性描述的表。
                ).createTemporaryTable( "outputTable" );

        //输出到文件
        resultSQL.insertInto( "outputTable" );


        env.execute("Table_FileOutput");

    }
}
